In this lab, we're going to look at MapReduce with Apache Spark. At the end of the lab, you should be able to:
SparkContext
object.Let's start by importing the packages we'll need. This week we're going to use pyspark
, a Python package that wraps Apache Spark and makes its functionality available in Python. We'll also use Python's built in urllib2
module to load data via HTTP and English language stop words from scikit-learn
.
In [ ]:
import pyspark
import urllib2
from sklearn.feature_extraction import text
First, let's initialise a SparkContext
object, which will represent our connection to the Spark cluster. To do this, we must first specify the URL of the master node to connect to. As we're only running this notebook for demonstration purposes, we can just run the cluster locally, as follows:
In [ ]:
sc = pyspark.SparkContext(master='local[*]')
Note: By specifying
master='local[*]'
, we are instructing Spark to run with as many worker threads as there are logical cores available on the host machine. Alternatively, we could directly specify the number of threads, e.g.master='local[4]'
to run four threads. However, we need to make sure to specify at least two threads, so that there is one available for resource management and at least one available for data processing.
Next, let's load the data. This week, we'll load data from a URL resource. We can do this using Python's built in urllib2
module. First, let's specify the URL of the resource:
In [ ]:
url = 'https://www.gutenberg.org/files/100/100-0.txt' # The Complete Works of Shakespeare
We can now load the data at the given URL using urllib2
(this might take a little while to load, depending on the speed of your internet connection), as follows:
In [ ]:
data = urllib2.urlopen(url).read()
data = data.splitlines() # Split the text into a list of lines
data[:10] # Print the first ten lines
We can load variables from our local Python kernel into our Spark cluster using the parallelize
method of the SparkContext
we have created, like this:
In [ ]:
rdd = sc.parallelize(data) # Create a resilient distributed dataset (RDD) from the data
We can examine the first few entries of the RDD using its take
method:
In [ ]:
rdd.take(10) # Take the first ten entries
As can be seen, the data inside the Spark RDD is equivalent to our earlier text data.
Let's use Spark to carry out a simple word count on our text data using MapReduce. As our RDD consists of a list of lines, we should first split the lines into words. We can use the flatMap
method of the RDD to do this. The flatMap
method maps a given function to each element in the RDD (e.g. the lines of the text) and then flattens the result. In our case, we want to split each line of our RDD into a list of words (i.e. map some word splitting function) and then create a single list of all of the words in the RDD (i.e. flatten the result).
Once we've got an RDD containing a list of all the words in the text, it might be useful to convert each word to lowercase, so that the word 'Thou'
is treated the same as the word 'thou'
. We can do this using the map
method, which applies a given function to each element in an RDD.
Then, for each word in the RDD, we can create a key-value pair, whose key consists of the word and whose value consists of just the number one, e.g. ('thou', 1)
. As this is an operation that must be applied to each element in the RDD, we can again use the map
method.
Finally, we can apply a function to reduce the values for each key using the reduceByKey
method of the RDD. This method iteratively applies a given function to pairs of values that have been generated for a given key until just one value remains. In our case, we want to sum the counts of each individual word, so we can supply the function lambda x, y: x + y
.
In [ ]:
word_counts = ( # Using parentheses allows inline comments like this
rdd.flatMap(lambda line: line.split()) # Split each line into words, flatten the result
.map(lambda word: word.lower()) # Make each word lowercase
.map(lambda word: (word, 1)) # Map each word to a key-value pair
.reduceByKey(lambda x, y: x + y) # Reduce pairs of values until just one remains for each key
)
The result of the set of operations above is another RDD:
In [ ]:
word_counts
This is because Spark computations are lazy, i.e. operations are appended to a computation graph, but not carried out until a later point in time. The advantage of this is that we can continue to append operations to an RDD until we are ready to compute the final result. At this point, Spark is able to optimise the calculation, which can reduce the total amount of computation that must be done due to the way some of the steps have been ordered.
If we need to, we can check the results of the operation using the take
method:
In [ ]:
word_counts.take(5)
As can be seen, Spark has generated counts for each of the words in the text. Next, let's sort the word counts according to the most commonly used words. To do this, we can use the sortBy
method of the RDD. Like reduceByKey
, sortBy
applies a given function to each key-value pair (kvp
) in our RDD.
In our case, we want to sort the RDD values according to the value of each key-value pair (i.e. the word count). As a result, we can pass the inline function lambda kvp: kvp[1]
to instruct Spark to sort according to the value of each key-value pair (lambda kvp: kvp[0]
would sort by the key). We'll also pass the optional keyword argument ascending=False
to force Spark to sort the values in ascending order (most common words first) rather than descending order (the default).
In [ ]:
top = word_counts.sortBy(lambda kvp: kvp[1], ascending=False)
top.take(5)
This is good, but the most common words correspond to very commonly used words in English (no surprise!). Let's filter these out using the list of English language stop words included in scikit-learn
(recall Lab 05). We can do this using the filter
method of the RDD, which filters each entry in the RDD according to whether a given function returns True
or False
.
In our case, we want to filter words that are not in the stop words list, so we can pass the inline function lambda kvp: kvp[0] not in text.ENGLISH_STOP_WORDS
which returns True
when kvp[0]
(the key / word in the key-value pair) is not in the stop words list and False
otherwise.
In [ ]:
top = top.filter(lambda kvp: kvp[0] not in text.ENGLISH_STOP_WORDS)
top.take(10)
If we were processing a very large text document, it would make more sense to perform the filtering at the start the of process, rather than at the end, in order to avoid all the computation involved in mapping and reducing solutions that we already know are on the stop words list. In such a case, we could simply write the entire algoritm as:
In [ ]:
word_counts = rdd.flatMap(lambda line: line.split()) \
.map(lambda word: word.lower()) \
.filter(lambda word: word not in text.ENGLISH_STOP_WORDS) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda x, y: x + y) \
.sortBy(lambda kvp: kvp[1], ascending=False)
word_counts.take(10)